Skip to content

Commit

Permalink
fix cdp message handling on browser shutdown
Browse files Browse the repository at this point in the history
Fixes #511
  • Loading branch information
egonelbre authored and ysmood committed Nov 26, 2021
1 parent f9a38dc commit de14d28
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 109 deletions.
187 changes: 134 additions & 53 deletions lib/cdp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package cdp
import (
"context"
"encoding/json"
"errors"
"net/http"
"sync"
"sync/atomic"
Expand All @@ -21,7 +22,9 @@ type Client struct {
header http.Header
ws WebSocketable

callbacks *sync.Map // buffer for response from browser
muSend sync.Mutex

pending *pendingRequests // buffer for response from browser

chReq chan []byte // request from user
chRes chan *Response // response from browser
Expand Down Expand Up @@ -68,12 +71,12 @@ type WebSocketable interface {
// New creates a cdp connection, all messages from Client.Event must be received or they will block the client.
func New(websocketURL string) *Client {
return &Client{
callbacks: &sync.Map{},
chReq: make(chan []byte),
chRes: make(chan *Response),
chEvent: make(chan *Event),
wsURL: websocketURL,
logger: defaults.CDP,
pending: newPendingRequests(),
chReq: make(chan []byte),
chRes: make(chan *Response),
chEvent: make(chan *Event),
wsURL: websocketURL,
logger: defaults.CDP,
}
}

Expand Down Expand Up @@ -112,8 +115,6 @@ func (cdp *Client) Connect(ctx context.Context) error {
cdp.ctx = ctx
cdp.close = cancel

go cdp.consumeMsg()

go cdp.readMsgFromBrowser()

return nil
Expand All @@ -127,6 +128,10 @@ func (cdp *Client) MustConnect(ctx context.Context) *Client {

// Call a method and get its response, if ctx is nil context.Background() will be used
func (cdp *Client) Call(ctx context.Context, sessionID, method string, params interface{}) ([]byte, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}

req := &Request{
ID: int(atomic.AddUint64(&cdp.count, 1)),
SessionID: sessionID,
Expand All @@ -139,29 +144,25 @@ func (cdp *Client) Call(ctx context.Context, sessionID, method string, params in
data, err := json.Marshal(req)
utils.E(err)

callback := make(chan *Response)

cdp.callbacks.Store(req.ID, callback)
defer cdp.callbacks.Delete(req.ID)

select {
case <-cdp.ctx.Done():
return nil, &errConnClosed{cdp.ctx.Err()}

case <-ctx.Done():
return nil, ctx.Err()
pending := newPendingRequest()
if err := cdp.pending.add(req.ID, pending); err != nil {
return nil, err
}
defer cdp.pending.delete(req.ID)

case cdp.chReq <- data:
if err := cdp.sendMsg(data); err != nil {
return nil, err
}

select {
case <-cdp.ctx.Done():
return nil, &errConnClosed{cdp.ctx.Err()}

case <-ctx.Done():
return nil, ctx.Err()

case res := <-callback:
case r := <-pending.result:
if r.err != nil {
return nil, r.err
}
res := r.response
if res.Error != nil {
return nil, res.Error
}
Expand All @@ -175,35 +176,22 @@ func (cdp *Client) Event() <-chan *Event {
return cdp.chEvent
}

// consume messages from client and browser
func (cdp *Client) consumeMsg() {
for {
select {
case <-cdp.ctx.Done():
return
func (cdp *Client) sendMsg(data []byte) error {
cdp.muSend.Lock()
defer cdp.muSend.Unlock()

case data := <-cdp.chReq:
err := cdp.ws.Send(data)
if err != nil {
cdp.wsClose(err)
return
}

case res := <-cdp.chRes:
callback, has := cdp.callbacks.Load(res.ID)
if has {
select {
case <-cdp.ctx.Done():
return
case callback.(chan *Response) <- res:
}
}
}
err := cdp.ws.Send(data)
if err != nil {
cdp.wsClose(err)
return err
}

return nil
}

func (cdp *Client) readMsgFromBrowser() {
defer close(cdp.chEvent)
defer cdp.wsClose(nil)

for {
data, err := cdp.ws.Read()
Expand All @@ -223,11 +211,7 @@ func (cdp *Client) readMsgFromBrowser() {
err := json.Unmarshal(data, &res)
utils.E(err)
cdp.logger.Println(&res)
select {
case <-cdp.ctx.Done():
return
case cdp.chRes <- &res:
}
cdp.pending.fulfill(id.ID, &res)
} else {
var evt Event
err := json.Unmarshal(data, &evt)
Expand All @@ -244,5 +228,102 @@ func (cdp *Client) readMsgFromBrowser() {

func (cdp *Client) wsClose(err error) {
cdp.logger.Println(err)
cdp.pending.close(&errConnClosed{err})
cdp.close()
}

// pendingRequests tracks requests that have been sent to the satellite.
type pendingRequests struct {
mu sync.Mutex
err error
pending map[int]*pendingRequest
}

func newPendingRequests() *pendingRequests {
return &pendingRequests{
pending: make(map[int]*pendingRequest),
}
}

// close marks the requests as not being able to make new requests.
// It will also close any pending requests.
func (reqs *pendingRequests) close(err error) {
reqs.mu.Lock()
defer reqs.mu.Unlock()

if reqs.err != nil {
return
}

if err == nil {
err = errors.New("browser has shut down")
}
reqs.err = err

for _, pending := range reqs.pending {
pending.close(err)
}
reqs.pending = map[int]*pendingRequest{}
}

// add adds a new pending request. When the browser has disconnected
// then it will return an error.
func (reqs *pendingRequests) add(id int, resp *pendingRequest) error {
reqs.mu.Lock()
defer reqs.mu.Unlock()
if reqs.err != nil {
return reqs.err
}
reqs.pending[id] = resp
return nil
}

// fulfill fills in a pending request and removes from the map.
func (reqs *pendingRequests) fulfill(id int, r *Response) {
reqs.mu.Lock()
defer reqs.mu.Unlock()

pending, ok := reqs.pending[id]
if !ok {
return
}
pending.respond(r)
delete(reqs.pending, id)
}

// delete delets a pending request.
func (reqs *pendingRequests) delete(id int) {
reqs.mu.Lock()
defer reqs.mu.Unlock()
delete(reqs.pending, id)
}

type pendingRequest struct {
done sync.Once
result chan pendingResponse
}

type pendingResponse struct {
response *Response
err error
}

func newPendingRequest() *pendingRequest {
return &pendingRequest{result: make(chan pendingResponse, 1)}
}

func (pending *pendingRequest) respond(r *Response) {
select {
case pending.result <- pendingResponse{response: r}:
default:
}
pending.done.Do(func() { close(pending.result) })
}

func (pending *pendingRequest) close(err error) {
select {
case pending.result <- pendingResponse{err: err}:
default:
}
pending.done.Do(func() { close(pending.result) })
}
Loading

0 comments on commit de14d28

Please sign in to comment.