-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
websocket rebuild 2019 #1178
Comments
Xm, this is an easy fix, I will delay the next v11.2. I have a question though, do you think that this issue is relatie to the data race you had experienced before with ws? |
yes, there are a few error races on each events happening.
Currently the max connections can only take up to around 190-233 without the fix. These extreme conditions forced out the race errors and all mutex, sync.Map are failed to protect them. By my patch it can take up to 500 and more connections. I have fixed the bytebuffer, connection management, issue. The code should like this Put releases byte buffer obtained via Get to the pool.
|
also i have custom built a channel based sync map for connection management specifically to take care of the connection race issues over resource limited condition.
|
Hey, fairly good code you provided to us, did you make any kind of benchmark test to see the dawnsides in terms of performance? |
@jjhesk I asked for the test code snippet you use to test those, you may not want to share it, so I made one that can simulate your case:
And, as expected, I don't have any issues running those in the same machine with limited resource. Here it is: $ cd ./server && go run main.go --race server/main.go package main
import (
"fmt"
"sync/atomic"
"time"
"github.com/kataras/iris"
"github.com/kataras/iris/websocket"
)
func main() {
app := iris.New()
ws := websocket.New(websocket.Config{})
ws.OnConnection(handleConnection)
app.Get("/socket", ws.Handler())
go func() {
t := time.NewTicker(2 * time.Second)
for {
<-t.C
conns := ws.GetConnections()
for _, conn := range conns {
fmt.Println(conn.ID())
}
if atomic.LoadUint64(&count) == 1200 {
fmt.Println("ALL CLIENTS DISCONNECTED")
t.Stop()
return
}
}
}()
app.Run(iris.Addr(":8080"))
}
func handleConnection(c websocket.Connection) {
c.OnDisconnect(func() { handleDisconnect(c) })
}
var count uint64
func handleDisconnect(c websocket.Connection) {
atomic.AddUint64(&count, 1)
fmt.Println("client disconnected!")
} $ cd ./client && go run main.go --race client/main.go package main
import (
"fmt"
"sync"
"time"
xwebsocket "golang.org/x/net/websocket"
)
var (
origin = "http://localhost/"
url = "ws://localhost:8080/socket"
)
func main() {
wg := new(sync.WaitGroup)
for i := 0; i < 600; i++ {
wg.Add(1)
go connect(wg, 5*time.Second)
}
for i := 0; i < 600; i++ {
wg.Add(1)
go connect(wg, 3*time.Second)
}
wg.Wait()
fmt.Println("ALL OK")
time.Sleep(5 * time.Second)
}
func connect(wg *sync.WaitGroup, alive time.Duration) {
conn, err := xwebsocket.Dial(url, "", origin)
if err != nil {
panic(err)
}
go func() {
time.Sleep(alive)
if err := conn.Close(); err != nil {
panic(err)
}
wg.Done()
}()
} Please, give me an example code to test your case, I can really help you because it may not be an iris issue at all. Thank you a lot! |
Hello @jjhesk, I am in progress of creating a websocket client for go client apps as well, see the whole progress at: #1175. Based on these I made a better example which covers 100% your test case(+emmiting data and pings and 600 random disconnection and + 600 same-time disconnections). It had tricky parts but there are all fixed. You can also verify that, take a look above: server/main.go package main
import (
"fmt"
"os"
"sync/atomic"
"time"
"github.com/kataras/iris"
"github.com/kataras/iris/websocket"
)
const totalClients = 1200
func main() {
app := iris.New()
// websocket.Config{PingPeriod: ((60 * time.Second) * 9) / 10}
ws := websocket.New(websocket.Config{})
ws.OnConnection(handleConnection)
app.Get("/socket", ws.Handler())
go func() {
t := time.NewTicker(2 * time.Second)
for {
<-t.C
conns := ws.GetConnections()
for _, conn := range conns {
// fmt.Println(conn.ID())
// Do nothing.
_ = conn
}
if atomic.LoadUint64(&count) == totalClients {
fmt.Println("ALL CLIENTS DISCONNECTED SUCCESSFULLY.")
t.Stop()
os.Exit(0)
return
}
}
}()
app.Run(iris.Addr(":8080"))
}
func handleConnection(c websocket.Connection) {
c.OnError(func(err error) { handleErr(c, err) })
c.OnDisconnect(func() { handleDisconnect(c) })
c.On("chat", func(message string) {
c.To(websocket.Broadcast).Emit("chat", c.ID()+": "+message)
})
}
var count uint64
func handleDisconnect(c websocket.Connection) {
atomic.AddUint64(&count, 1)
fmt.Printf("client [%s] disconnected!\n", c.ID())
}
func handleErr(c websocket.Connection, err error) {
fmt.Printf("client [%s] errored: %v\n", c.ID(), err)
} client/main.go package main
import (
"bufio"
"fmt"
"math/rand"
"os"
"sync"
"time"
"github.com/kataras/iris/websocket"
)
var (
url = "ws://localhost:8080/socket"
f *os.File
)
const totalClients = 1200
func main() {
var err error
f, err = os.Open("./test.data")
if err != nil {
panic(err)
}
defer f.Close()
wg := new(sync.WaitGroup)
for i := 0; i < totalClients/2; i++ {
wg.Add(1)
go connect(wg, 5*time.Second)
}
for i := 0; i < totalClients/2; i++ {
wg.Add(1)
waitTime := time.Duration(rand.Intn(10)) * time.Millisecond
time.Sleep(waitTime)
go connect(wg, 10*time.Second+waitTime)
}
wg.Wait()
fmt.Println("ALL OK.")
time.Sleep(5 * time.Second)
}
func connect(wg *sync.WaitGroup, alive time.Duration) {
c, err := websocket.Dial(url, websocket.ConnectionConfig{})
if err != nil {
panic(err)
}
c.OnError(func(err error) {
fmt.Printf("error: %v", err)
})
disconnected := false
c.OnDisconnect(func() {
fmt.Printf("I am disconnected after [%s].\n", alive)
disconnected = true
})
c.On("chat", func(message string) {
fmt.Printf("\n%s\n", message)
})
go func() {
time.Sleep(alive)
if err := c.Disconnect(); err != nil {
panic(err)
}
wg.Done()
}()
scanner := bufio.NewScanner(f)
for !disconnected {
if !scanner.Scan() || scanner.Err() != nil {
break
}
c.Emit("chat", scanner.Text())
}
}
|
@kataras hi there, I used nodejs for the test case. I will write up some test cases that only works on my furthered developed application apis. But i will get you the starter test cases as well. |
@kataras i have a latest update of the whole websocket engine. still there are alot of bugs from using this. I think we should take a look of their achievement from recent conference from another team. This repository demonstrates how a very high number of websockets connections can be maintained efficiently in Linux. https://github.com/eranyanay/1m-go-websockets I think I will switch to this but I want to follow the same format from the current tags and related items. |
I think we have to follow their findings and make it integrated into iris-go. Check it out: https://medium.freecodecamp.org/million-websockets-and-go-cc58418460bb from what I understand now, there are 2 major caveats:
yea. I have been learning alot from stability and optimizations. There are alot of articles to read from.. |
additional add as the epoller object. here is more toolings: https://github.com/mailru/easygo/tree/master/netpoll package main
import (
"golang.org/x/sys/unix"
"log"
"net"
"reflect"
"sync"
"syscall"
)
type epoll struct {
fd int
connections map[int]net.Conn
lock *sync.RWMutex
}
func MkEpoll() (*epoll, error) {
fd, err := unix.EpollCreate1(0)
if err != nil {
return nil, err
}
return &epoll{
fd: fd,
lock: &sync.RWMutex{},
connections: make(map[int]net.Conn),
}, nil
}
func (e *epoll) Add(conn net.Conn) error {
// Extract file descriptor associated with the connection
fd := websocketFD(conn)
err := unix.EpollCtl(e.fd, syscall.EPOLL_CTL_ADD, fd, &unix.EpollEvent{Events: unix.POLLIN | unix.POLLHUP, Fd: int32(fd)})
if err != nil {
return err
}
e.lock.Lock()
defer e.lock.Unlock()
e.connections[fd] = conn
if len(e.connections)%100 == 0 {
log.Printf("Total number of connections: %v", len(e.connections))
}
return nil
}
func (e *epoll) Remove(conn net.Conn) error {
fd := websocketFD(conn)
err := unix.EpollCtl(e.fd, syscall.EPOLL_CTL_DEL, fd, nil)
if err != nil {
return err
}
e.lock.Lock()
defer e.lock.Unlock()
delete(e.connections, fd)
if len(e.connections)%100 == 0 {
log.Printf("Total number of connections: %v", len(e.connections))
}
return nil
}
func (e *epoll) Wait() ([]net.Conn, error) {
events := make([]unix.EpollEvent, 100)
n, err := unix.EpollWait(e.fd, events, 100)
if err != nil {
return nil, err
}
e.lock.RLock()
defer e.lock.RUnlock()
var connections []net.Conn
for i := 0; i < n; i++ {
conn := e.connections[int(events[i].Fd)]
connections = append(connections, conn)
}
return connections, nil
}
func websocketFD(conn net.Conn) int {
//tls := reflect.TypeOf(conn.UnderlyingConn()) == reflect.TypeOf(&tls.Conn{})
// Extract the file descriptor associated with the connection
//connVal := reflect.Indirect(reflect.ValueOf(conn)).FieldByName("conn").Elem()
tcpConn := reflect.Indirect(reflect.ValueOf(conn)).FieldByName("conn")
//if tls {
// tcpConn = reflect.Indirect(tcpConn.Elem())
//}
fdVal := tcpConn.FieldByName("fd")
pfdVal := reflect.Indirect(fdVal).FieldByName("pfd")
return int(pfdVal.FieldByName("Sysfd").Int())
} |
optimisation 2
import (
"net"
"github.com/gobwas/ws"
)
ln, _ := net.Listen("tcp", ":8080")
for {
// Try to accept incoming connection inside free pool worker.
// If there no free workers for 1ms, do not accept anything and try later.
// This will help us to prevent many self-ddos or out of resource limit cases.
err := pool.ScheduleTimeout(time.Millisecond, func() {
conn := ln.Accept()
_ = ws.Upgrade(conn)
// Wrap WebSocket connection with our Channel struct.
// This will help us to handle/send our app's packets.
ch := NewChannel(conn)
// Wait for incoming bytes from connection.
poller.Start(conn, netpoll.EventRead, func() {
// Do not cross the resource limits.
pool.Schedule(func() {
// Read and handle incoming packet(s).
ch.Recevie()
})
})
})
if err != nil {
time.Sleep(time.Millisecond)
}
} |
If you want to help out and put iris features on your on-going transaction to that library I would love to participate and help you, so I would be more prepared to put something like this for all iris users too. You can contact me on
|
…. It implements the gobwas/ws library (it works but need fixes on determinate closing connections) as suggested at: #1178
@jjhesk I couldn't wait until tomorrow neither sleep without action based on your comments. So I tried to stay awake and code a temp |
right. open a new package and put everything into it and test it out! The coverage testing setup should be something like this...
rundown test
|
my work in progress repo https://github.com/GoLandr/iris for the websocket upgrade session I would suggest to open a branch or dedicated pull request just to sorting out these problems. |
another finished design by https://github.com/faceair/fastsocket that can be referencing. this part is really hard to implement into the existing design. https://github.com/faceair/fastsocket/blob/2873265ab4241cc42eb8d139e670ca5b7516f207/server.go @kataras need your help. |
yes. noticed and commented on the pr |
@kataras the websocket2 package is working correctly, so im not going to modify it. I am doing series of testing against different cases for this package. On the other hand, |
from the websocket2 alone, so far I got this... its running pretty good under 50 connections with some operations.
12 hours testing with 3 data race detected. |
@kataras does this part of the code from the websocket2 has compiled the rule using func (s *Server) Handler() context.Handler {
return func(ctx context.Context) {
c := s.Upgrade(ctx)
if c.Err() != nil {
return
}
// NOTE TO ME: fire these first BEFORE startReader and startPinger
// in order to set the events and any messages to send
// the startPinger will send the OK to the client and only
// then the client is able to send and receive from Server
// when all things are ready and only then. DO NOT change this order.
// fire the on connection event callbacks, if any
for i := range s.onConnectionListeners {
s.onConnectionListeners[i](c)
}
// start the ping and the messages reader
c.Wait()
}
} |
…ng a bit lower level of the new ws lib api and restore the previous sync.Map for server's live connections, relative: #1178
from the above testing result. e5d0702 @kataras i updated your part and did another test.. i got the below detection.. ==================
WARNING: DATA RACE
Write at 0x00c0003a2480 by goroutine 134:
runtime.slicecopy()
/root/.go/src/runtime/slice.go:221 +0x0
_/root/compiledxx/backendc/main/temp.(*messageSerializer).serialize()
/root/go/src/github.com/valyala/bytebufferpool/bytebuffer.go:73 +0x17b
_/root/compiledxx/backendc/main/temp.(*emitter).Emit()
/root/compiledxx/backendc/main/temp/emitter.go:37 +0xa0
_/root/compiledxx/backendc/main/core_x.AnnounceProfileUpdate.func1()
/root/compiledxx/backendc/main/core_x/ws_core.go:516 +0x191
_/root/compiledxx/backendc/main/core_x.ExecSyncRountine()
/root/compiledxx/backendc/main/core_x/connectionLock.go:60 +0x74
Previous read at 0x00c0003a2480 by goroutine 24:
internal/race.ReadRange()
/root/.go/src/internal/race/race.go:45 +0x42
syscall.Write()
/root/.go/src/syscall/syscall_unix.go:193 +0xaa
internal/poll.(*FD).Write()
/root/.go/src/internal/poll/fd_unix.go:268 +0x1d8
net.(*netFD).Write()
/root/.go/src/net/fd_unix.go:220 +0x65
net.(*conn).Write()
/root/.go/src/net/net.go:189 +0xa0
net.(*TCPConn).Write()
<autogenerated>:1 +0x69
github.com/gobwas/ws.WriteFrame()
/root/go/src/github.com/gobwas/ws/write.go:111 +0xd7
github.com/gobwas/ws/wsutil.writeFrame()
/root/go/src/github.com/gobwas/ws/wsutil/writer.go:449 +0x1ca
github.com/gobwas/ws/wsutil.WriteMessage()
/root/go/src/github.com/gobwas/ws/wsutil/helper.go:161 +0x81
_/root/compiledxx/backendc/main/temp.(*connection).Write()
/root/compiledxx/backendc/main/temp/connection.go:377 +0x166
_/root/compiledxx/backendc/main/temp.(*connection).writeDefault()
/root/compiledxx/backendc/main/temp/connection.go:389 +0x76
_/root/compiledxx/backendc/main/temp.(*Server).emitMessage()
/root/compiledxx/backendc/main/temp/server.go:366 +0x25d
_/root/compiledxx/backendc/main/temp.(*emitter).EmitMessage()
/root/compiledxx/backendc/main/temp/emitter.go:32 +0x127
_/root/compiledxx/backendc/main/temp.(*emitter).Emit()
/root/compiledxx/backendc/main/temp/emitter.go:41 +0x10e
_/root/compiledxx/backendc/main/temp.(*connection).Emit()
/root/compiledxx/backendc/main/temp/connection.go:695 +0x1c6
_/root/compiledxx/backendc/main/core_x.EmitToAllSubbed()
/root/compiledxx/backendc/main/core_x/ws_core.go:405 +0x23d
_/root/compiledxx/backendc/main/core_x.loopMonitorBB.func1()
/root/compiledxx/backendc/main/core_x/bd_bigbang.go:39 +0x3ed
_/root/compiledxx/backendc/main/core_x.CoreLoopEngineV2()
/root/compiledxx/backendc/main/core_x/bd_core.go:31 +0x4fe
_/root/compiledxx/backendc/main/core_x.loopMonitorBB()
/root/compiledxx/backendc/main/core_x/bd_bigbang.go:12 +0xe0
Goroutine 134 (running) created at:
_/root/compiledxx/backendc/main/core_x.AnnounceProfileUpdate()
/root/compiledxx/backendc/main/core_x/ws_core.go:502 +0x105
_/root/compiledxx/backendc/main/core_x.loopMonitorBB.func1.2()
/root/compiledxx/backendc/main/core_x/bd_bigbang.go:36 +0x1ad
_/root/compiledxx/backendc/main/core_x.(*BBGame).ReconsultAfterExplode.func1()
/root/compiledxx/backendc/main/core_x/bd_bgb.go:467 +0x4c0
sync.(*Map).Range()
/root/.go/src/sync/map.go:337 +0x13c
_/root/compiledxx/backendc/main/core_x.(*BBGame).ReconsultAfterExplode()
/root/compiledxx/backendc/main/core_x/bd_bgb.go:429 +0xa5
_/root/compiledxx/backendc/main/core_x.loopMonitorBB.func1()
/root/compiledxx/backendc/main/core_x/bd_bigbang.go:34 +0x394
_/root/compiledxx/backendc/main/core_x.CoreLoopEngineV2()
/root/compiledxx/backendc/main/core_x/bd_core.go:31 +0x4fe
_/root/compiledxx/backendc/main/core_x.loopMonitorBB()
/root/compiledxx/backendc/main/core_x/bd_bigbang.go:12 +0xe0
Goroutine 24 (running) created at:
_/root/compiledxx/backendc/main/core_x.StartWebsocket()
/root/compiledxx/backendc/main/core_x/ws_core.go:394 +0x336
_/root/compiledxx/backendc/main/core_x.SetupWebCombineClient()
/root/compiledxx/backendc/main/core_x/webhost.go:58 +0x788
main.startProc()
/root/compiledxx/backendc/main/main.go:57 +0x103
main.main()
/root/compiledxx/backendc/main/main.go:36 +0x57
================== limits on connections: 54. from switching to sync.map is performance is slower and there is a limit on max connections which is 54 on my test case. once it reached to this number the server just simple timeout all later connections.
|
@kataras are there any ways to avoid |
i will update you with the latest fix on the websocket2 package. |
… commit fixes the kataras#1178 and kataras#1173) Former-commit-id: 74ccd8f4bf60a71f1eb0e34149a6f19de95a9148
…. It implements the gobwas/ws library (it works but need fixes on determinate closing connections) as suggested at: kataras#1178 Former-commit-id: be5ee623b7d030bd9e03a1a2f320ead975ef2ba8
…ng a bit lower level of the new ws lib api and restore the previous sync.Map for server's live connections, relative: kataras#1178 Former-commit-id: 40da148afb66a42d47285efce324269d66ed3b0e
i have got the mel formed json from the websocket output. Currently found out that is caused by some race detection from the data transmission. it is the same issue caused by unsafe access in ByteBuffer.
The text was updated successfully, but these errors were encountered: