Skip to content

Commit

Permalink
feat(server): 完成reactor server开发,但测试过程中遇到个问题
Browse files Browse the repository at this point in the history
Signed-off-by: Trino <[email protected]>
  • Loading branch information
Trinoooo committed Jun 19, 2024
1 parent 52e2966 commit 4e4b730
Show file tree
Hide file tree
Showing 8 changed files with 776 additions and 3 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ module github.com/Trinoooo/eggie_kv
go 1.19

require (
github.com/bytedance/gopkg v0.0.0-20231219111115-a5eedbe96960
github.com/bytedance/gopkg v0.0.0-20240507064146-197ded923ae3
github.com/chzyer/readline v1.5.1
github.com/cloudwego/netpoll v0.6.1
github.com/luci/go-render v0.0.0-20160219211803-9a04cc21af0f
github.com/mitchellh/go-homedir v1.1.0
github.com/pkg/errors v0.9.1
github.com/spf13/viper v1.18.2
github.com/urfave/cli/v2 v2.26.0
go.uber.org/zap v1.26.0
Expand Down
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
github.com/bytedance/gopkg v0.0.0-20231219111115-a5eedbe96960 h1:t2xAuIlnhWJDIpcHZEbpoVsQH1hOk9eGGaKU2dXl1PE=
github.com/bytedance/gopkg v0.0.0-20231219111115-a5eedbe96960/go.mod h1:FtQG3YbQG9L/91pbKSw787yBQPutC+457AvDW77fgUQ=
github.com/bytedance/gopkg v0.0.0-20240507064146-197ded923ae3 h1:ZKUHguI38SRQJkq7hhmwn8lAv3xM6B5qkj1IneS15YY=
github.com/bytedance/gopkg v0.0.0-20240507064146-197ded923ae3/go.mod h1:FtQG3YbQG9L/91pbKSw787yBQPutC+457AvDW77fgUQ=
github.com/chzyer/logex v1.2.1 h1:XHDu3E6q+gdHgsdTPH6ImJMIp436vR6MPtH8gP05QzM=
github.com/chzyer/logex v1.2.1/go.mod h1:JLbx6lG2kDbNRFnfkgvh4eRJRPX1QCoOIWomwysCBrQ=
github.com/chzyer/readline v1.5.1 h1:upd/6fQk4src78LMRzh5vItIt361/o4uq553V8B5sGI=
github.com/chzyer/readline v1.5.1/go.mod h1:Eh+b79XXUwfKfcPLepksvw2tcLE/Ct21YObkaSkeBlk=
github.com/chzyer/test v1.0.0 h1:p3BQDXSxOhOG0P9z6/hGnII4LGiEPOYBhs8asl/fC04=
github.com/chzyer/test v1.0.0/go.mod h1:2JlltgoNkt4TW/z9V/IzDdFaMTM2JPIi26O1pF38GC8=
github.com/cloudwego/netpoll v0.6.1 h1:Cjftvi6bmumsOijmuUFy6HqAUXMxAT3fKK96wsrm3XA=
github.com/cloudwego/netpoll v0.6.1/go.mod h1:kaqvfZ70qd4T2WtIIpCOi5Cxyob8viEpzLhCrTrz3HM=
github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -29,6 +31,8 @@ github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyua
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
Expand Down
38 changes: 38 additions & 0 deletions storage/server/netpoll_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package server

import (
"context"
"github.com/cloudwego/netpoll"
"net"
)

// NetpollEventLoopServer 字节的NIO eventLoop 网络库
type NetpollEventLoopServer struct {
listener net.Listener
eventLoop netpoll.EventLoop
}

func NewNetpollEventLoopServer(addr string, handler netpoll.OnRequest) (*NetpollEventLoopServer, error) {
var srv = &NetpollEventLoopServer{}
var err error

srv.listener, err = netpoll.CreateListener("tcp", addr)
if err != nil {
return nil, err
}

srv.eventLoop, err = netpoll.NewEventLoop(handler)
if err != nil {
return nil, err
}

return srv, nil
}

func (els *NetpollEventLoopServer) Serve() error {
return els.eventLoop.Serve(els.listener)
}

func (els *NetpollEventLoopServer) Close() error {
return els.eventLoop.Shutdown(context.Background())
}
18 changes: 18 additions & 0 deletions storage/server/proactor_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package server

// ProactorServer 自实现的proactor网络模式服务器
// 或许后续可以实现个windows版本
type ProactorServer struct {
}

func NewProactorServer() *ProactorServer {
return &ProactorServer{}
}

func (ps *ProactorServer) Serve() {

}

func (ps *ProactorServer) Close() error {
return nil
}
90 changes: 90 additions & 0 deletions storage/server/pure_goroutine_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package server

import (
"context"
"log"
"net"
"sync"
)

// PureGoroutineServer 没用多路IO复用,纯goroutine并发
type PureGoroutineServer struct {
mutex sync.Mutex
serverTransport net.Listener
handler simpleHandler
stop chan struct{}
done sync.WaitGroup
}

func NewPureGoroutineServer(addr string, handler simpleHandler) (*PureGoroutineServer, error) {
var srv = &PureGoroutineServer{
handler: handler,
stop: make(chan struct{}),
}
var err error
srv.serverTransport, err = net.Listen("tcp", addr)
if err != nil {
return nil, err
}
return srv, nil
}

func (pgc *PureGoroutineServer) Serve() error {
for {
select {
case <-pgc.stop:
return nil
default:
}

conn, err := pgc.serverTransport.Accept()
if err != nil {
pgc.mutex.Lock()
select {
case <-pgc.stop:
pgc.mutex.Unlock()
return nil
default:
log.Println(err)
close(pgc.stop)
pgc.mutex.Unlock()
return err
}
}

pgc.done.Add(2)
ctx, cancel := context.WithCancel(context.Background())
// bizHandler
go func() {
defer func() {
if err := conn.Close(); err != nil {
log.Println(err)
}
pgc.done.Done()
cancel()
}()
pgc.handler(ctx, conn)
}()
// notifier
go func() {
defer pgc.done.Done()
select {
case <-pgc.stop:
cancel()
case <-ctx.Done():
}
}()
}
}

func (pgc *PureGoroutineServer) Close() error {
pgc.mutex.Lock()
defer pgc.mutex.Unlock()
err := pgc.serverTransport.Close()
if err != nil {
return err
}
close(pgc.stop)
pgc.done.Wait()
return nil
}
Loading

0 comments on commit 4e4b730

Please sign in to comment.