Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add browser dialer for splithttp
Browse files Browse the repository at this point in the history
mmmray committed Jun 29, 2024
1 parent 8320732 commit 2347da1
Showing 11 changed files with 508 additions and 287 deletions.
125 changes: 125 additions & 0 deletions transport/internet/browser_dialer/dialer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package browser_dialer

import (
"bytes"
_ "embed"
"encoding/base64"
"net/http"
"time"

"github.com/gorilla/websocket"
"github.com/xtls/xray-core/common/platform"
"github.com/xtls/xray-core/common/uuid"
)

//go:embed dialer.html
var webpage []byte

var conns chan *websocket.Conn

var upgrader = &websocket.Upgrader{
ReadBufferSize: 0,
WriteBufferSize: 0,
HandshakeTimeout: time.Second * 4,
CheckOrigin: func(r *http.Request) bool {
return true
},
}

func init() {
addr := platform.NewEnvFlag(platform.BrowserDialerAddress).GetValue(func() string { return "" })
if addr != "" {
token := uuid.New()
csrfToken := token.String()
webpage = bytes.ReplaceAll(webpage, []byte("csrfToken"), []byte(csrfToken))
conns = make(chan *websocket.Conn, 256)
go http.ListenAndServe(addr, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/websocket" {
if r.URL.Query().Get("token") == csrfToken {
if conn, err := upgrader.Upgrade(w, r, nil); err == nil {
conns <- conn
} else {
newError("Browser dialer http upgrade unexpected error").AtError().WriteToLog()
}
}
} else {
w.Write(webpage)
}
}))
}
}

func HasBrowserDialer() bool {
return conns != nil;
}

func DialWS(uri string, ed []byte) (*websocket.Conn, error) {
data := []byte("WS "+uri)
if ed != nil {
data = append(data, " "+base64.RawURLEncoding.EncodeToString(ed)...)
}

return dialRaw(data)
}

func DialGet(uri string) (*websocket.Conn, error) {
data := []byte("GET "+uri)
return dialRaw(data)
}

func DialPost(uri string, payload []byte) error {
data := []byte("POST "+uri)
conn, err := dialRaw(data)
if err != nil {
return err
}

err = conn.WriteMessage(websocket.BinaryMessage, payload)
if err != nil {
return err
}

// send "EOF"
err = conn.WriteMessage(websocket.BinaryMessage, []byte{})
if err != nil {
return err
}

err = CheckOK(conn)
if err != nil {
return err
}

conn.Close()
return nil
}

func dialRaw(data []byte) (*websocket.Conn, error) {
var conn *websocket.Conn
for {
conn = <-conns
if conn.WriteMessage(websocket.TextMessage, data) != nil {
conn.Close()
} else {
break
}
}
err := CheckOK(conn)
if err != nil {
return nil, err
}

return conn, nil
}

func CheckOK(conn *websocket.Conn) error {
if _, p, err := conn.ReadMessage(); err != nil {
conn.Close()
return err
} else if s := string(p); s != "ok" {
conn.Close()
return newError(s)
}

return nil
}
105 changes: 105 additions & 0 deletions transport/internet/browser_dialer/dialer.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
<!-- vim: set noexpandtab: -->
<!DOCTYPE html>
<html>
<head>
<title>Browser Dialer</title>
</head>
<body>
<script>
// Copyright (c) 2021 XRAY. Mozilla Public License 2.0.
var url = "ws://" + window.location.host + "/websocket?token=csrfToken"
var count = 0
setInterval(check, 1000)
function check() {
if (count <= 0) {
count += 1
console.log("Prepare", url)
var ws = new WebSocket(url)
// arraybuffer is significantly faster in chrome than default
// blob, tested with chrome 123
ws.binaryType = "arraybuffer";
ws.onmessage = function (event) {
count -= 1;
let [method, url, protocol] = event.data.split(" ");
if (method == "WS") {
console.log("Dial WS", url, protocol);
const wss = new WebSocket(url, protocol);
wss.binaryType = "arraybuffer";
var opened = false;
ws.onmessage = function (event) {
wss.send(event.data)
}
wss.onopen = function (event) {
opened = true;
ws.send("ok")
}
wss.onmessage = function (event) {
ws.send(event.data)
}
wss.onclose = function (event) {
ws.close()
}
wss.onerror = function (event) {
!opened && ws.send("fail")
wss.close()
}
ws.onclose = function (event) {
wss.close()
}
} else if (method == "GET") {
(async () => {
console.log("Dial GET", url);
ws.send("ok");
const controller = new AbortController();
const response = await fetch(url, {signal: controller.signal});
const body = await response.body;
const reader = body.getReader();

ws.onclose = function (event) {
try {
controller.abort();
} catch(e) {}
}

while (true) {
const { done, value } = await reader.read();
console.log("chunk download");
ws.send(value);
if (done) break;
}

ws.close();
})()
} else if (method == "POST") {
console.log("Dial POST", url);
ws.send("ok");
let body;
ws.onmessage = function (event) {
if (event.data.byteLength == 0) {
console.log("start upload");
(async () => {
const response = await fetch(
url,
{method: "POST", body}
);
if (response.ok) {
ws.send("ok");
ws.close();
}
})();
} else {
body = event.data;
}
};
}

check()
}
ws.onerror = function (event) {
ws.close()
}
}
}
</script>
</body>
</html>
9 changes: 9 additions & 0 deletions transport/internet/browser_dialer/errors.generated.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package browser_dialer

import "github.com/xtls/xray-core/common/errors"

type errPathObjHolder struct{}

func newError(values ...interface{}) *errors.Error {
return errors.New(values...).WithPathObj(errPathObjHolder{})
}
39 changes: 39 additions & 0 deletions transport/internet/splithttp/browser_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package splithttp

import (
"context"
"io"
"io/ioutil"
gonet "net"

"github.com/xtls/xray-core/transport/internet/browser_dialer"
"github.com/xtls/xray-core/transport/internet/websocket"
)

// implements splithttp.DialerClient in terms of browser dialer
// has no fields because everything is global state :O)
type BrowserDialerClient struct{}

func (c *BrowserDialerClient) OpenDownload(ctx context.Context, baseURL string) (io.ReadCloser, gonet.Addr, gonet.Addr, error) {
conn, err := browser_dialer.DialGet(baseURL)
dummyAddr := &gonet.IPAddr{}
if err != nil {
return nil, dummyAddr, dummyAddr, err
}

return websocket.NewConnection(conn, dummyAddr, nil), conn.RemoteAddr(), conn.LocalAddr(), nil
}

func (c *BrowserDialerClient) SendUploadRequest(ctx context.Context, url string, payload io.ReadWriteCloser) error {
bytes, err := ioutil.ReadAll(payload)
if err != nil {
return err
}

err = browser_dialer.DialPost(url, bytes)
if err != nil {
return err
}

return nil
}
154 changes: 154 additions & 0 deletions transport/internet/splithttp/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package splithttp

import (
"context"
"io"
gonet "net"
"net/http"
"net/http/httptrace"
"sync"

"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/signal/done"
)

// interface to abstract between use of browser dialer, vs net/http
type DialerClient interface {
// (ctx, baseURL, payload) -> err
// baseURL already contains sessionId and seq
SendUploadRequest(context.Context, string, io.ReadWriteCloser) error

// (ctx, baseURL) -> (downloadReader, remoteAddr, localAddr)
// baseURL already contains sessionId
OpenDownload(context.Context, string) (io.ReadCloser, net.Addr, net.Addr, error)
}

// implements splithttp.DialerClient in terms of direct network connections
type DefaultDialerClient struct {
TransportConfig *Config
Download *http.Client
Upload *http.Client
IsH2 bool
// pool of net.Conn, created using dialUploadConn
UploadRawPool *sync.Pool
DialUploadConn func(ctxInner context.Context) (net.Conn, error)
}

func (c *DefaultDialerClient) OpenDownload(ctx context.Context, baseURL string) (io.ReadCloser, gonet.Addr, gonet.Addr, error) {
var remoteAddr gonet.Addr
var localAddr gonet.Addr
// this is done when the TCP/UDP connection to the server was established,
// and we can unblock the Dial function and print correct net addresses in
// logs
gotConn := done.New()

var downResponse io.ReadCloser
gotDownResponse := done.New()

go func() {
trace := &httptrace.ClientTrace{
GotConn: func(connInfo httptrace.GotConnInfo) {
remoteAddr = connInfo.Conn.RemoteAddr()
localAddr = connInfo.Conn.LocalAddr()
gotConn.Close()
},
}

// in case we hit an error, we want to unblock this part
defer gotConn.Close()

req, err := http.NewRequestWithContext(
httptrace.WithClientTrace(ctx, trace),
"GET",
baseURL,
nil,
)
if err != nil {
newError("failed to construct download http request").Base(err).WriteToLog()
gotDownResponse.Close()
return
}

req.Header = c.TransportConfig.GetRequestHeader()

response, err := c.Download.Do(req)
gotConn.Close()
if err != nil {
newError("failed to send download http request").Base(err).WriteToLog()
gotDownResponse.Close()
return
}

if response.StatusCode != 200 {
response.Body.Close()
newError("invalid status code on download:", response.Status).WriteToLog()
gotDownResponse.Close()
return
}

downResponse = response.Body
gotDownResponse.Close()
}()

// we want to block Dial until we know the remote address of the server,
// for logging purposes
<-gotConn.Wait()

lazyDownload := &LazyReader{
CreateReader: func() (io.ReadCloser, error) {
<-gotDownResponse.Wait()
if downResponse == nil {
return nil, newError("downResponse failed")
}
return downResponse, nil
},
}

return lazyDownload, remoteAddr, localAddr, nil
}

func (c *DefaultDialerClient) SendUploadRequest(ctx context.Context, url string, payload io.ReadWriteCloser) error {
req, err := http.NewRequest("POST", url, payload)
if err != nil {
return err
}
req.Header = c.TransportConfig.GetRequestHeader()

if c.IsH2 {
resp, err := c.Upload.Do(req)
if err != nil {
return err
}

defer resp.Body.Close()

if resp.StatusCode != 200 {
return newError("bad status code:", resp.Status)
}
} else {
var err error
var uploadConn any
for i := 0; i < 5; i++ {
uploadConn = c.UploadRawPool.Get()
if uploadConn == nil {
uploadConn, err = c.DialUploadConn(ctx)
if err != nil {
return err
}
}

err = req.Write(uploadConn.(net.Conn))
if err == nil {
break
}
}

if err != nil {
return err
}

c.UploadRawPool.Put(uploadConn)
}

return nil
}
200 changes: 48 additions & 152 deletions transport/internet/splithttp/dialer.go
Original file line number Diff line number Diff line change
@@ -4,9 +4,7 @@ import (
"context"
gotls "crypto/tls"
"io"
gonet "net"
"net/http"
"net/http/httptrace"
"net/url"
"strconv"
"sync"
@@ -16,10 +14,10 @@ import (
"github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/session"
"github.com/xtls/xray-core/common/signal/done"
"github.com/xtls/xray-core/common/signal/semaphore"
"github.com/xtls/xray-core/common/uuid"
"github.com/xtls/xray-core/transport/internet"
"github.com/xtls/xray-core/transport/internet/browser_dialer"
"github.com/xtls/xray-core/transport/internet/stat"
"github.com/xtls/xray-core/transport/internet/tls"
"github.com/xtls/xray-core/transport/pipe"
@@ -31,32 +29,31 @@ type dialerConf struct {
*internet.MemoryStreamConfig
}

type reusedClient struct {
download *http.Client
upload *http.Client
isH2 bool
// pool of net.Conn, created using dialUploadConn
uploadRawPool *sync.Pool
dialUploadConn func(ctxInner context.Context) (net.Conn, error)
}

var (
globalDialerMap map[dialerConf]reusedClient
globalDialerMap map[dialerConf]DialerClient
globalDialerAccess sync.Mutex
)

func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) reusedClient {
func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) DialerClient {
if browser_dialer.HasBrowserDialer() {
return &BrowserDialerClient{}
}

globalDialerAccess.Lock()
defer globalDialerAccess.Unlock()

if globalDialerMap == nil {
globalDialerMap = make(map[dialerConf]reusedClient)
globalDialerMap = make(map[dialerConf]DialerClient)
}

if client, found := globalDialerMap[dialerConf{dest, streamSettings}]; found {
return client
}

if browser_dialer.HasBrowserDialer() {
return &BrowserDialerClient{}
}

tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
isH2 := tlsConfig != nil && !(len(tlsConfig.NextProtocol) == 1 && tlsConfig.NextProtocol[0] == "http/1.1")

@@ -115,16 +112,17 @@ func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *in
uploadTransport = nil
}

client := reusedClient{
download: &http.Client{
client := &DefaultDialerClient{
TransportConfig: streamSettings.ProtocolSettings.(*Config),
Download: &http.Client{
Transport: downloadTransport,
},
upload: &http.Client{
Upload: &http.Client{
Transport: uploadTransport,
},
isH2: isH2,
uploadRawPool: &sync.Pool{},
dialUploadConn: dialContext,
IsH2: isH2,
UploadRawPool: &sync.Pool{},
DialUploadConn: dialContext,
}

globalDialerMap[dialerConf{dest, streamSettings}] = client
@@ -159,80 +157,9 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me

httpClient := getHTTPClient(ctx, dest, streamSettings)

var remoteAddr gonet.Addr
var localAddr gonet.Addr
// this is done when the TCP/UDP connection to the server was established,
// and we can unblock the Dial function and print correct net addresses in
// logs
gotConn := done.New()

var downResponse io.ReadCloser
gotDownResponse := done.New()

sessionIdUuid := uuid.New()
sessionId := sessionIdUuid.String()

go func() {
trace := &httptrace.ClientTrace{
GotConn: func(connInfo httptrace.GotConnInfo) {
remoteAddr = connInfo.Conn.RemoteAddr()
localAddr = connInfo.Conn.LocalAddr()
gotConn.Close()
},
}

// in case we hit an error, we want to unblock this part
defer gotConn.Close()

req, err := http.NewRequestWithContext(
httptrace.WithClientTrace(context.WithoutCancel(ctx), trace),
"GET",
requestURL.String()+sessionId,
nil,
)
if err != nil {
newError("failed to construct download http request").Base(err).WriteToLog()
gotDownResponse.Close()
return
}

req.Header = transportConfiguration.GetRequestHeader()

response, err := httpClient.download.Do(req)
gotConn.Close()
if err != nil {
newError("failed to send download http request").Base(err).WriteToLog()
gotDownResponse.Close()
return
}

if response.StatusCode != 200 {
response.Body.Close()
newError("invalid status code on download:", response.Status).WriteToLog()
gotDownResponse.Close()
return
}

// skip "ooooooooook" response
trashHeader := []byte{0}
for {
_, err = io.ReadFull(response.Body, trashHeader)
if err != nil {
response.Body.Close()
newError("failed to read initial response").Base(err).WriteToLog()
gotDownResponse.Close()
return
}
if trashHeader[0] == 'k' {
break
}
}

downResponse = response.Body
gotDownResponse.Close()
}()

uploadUrl := requestURL.String() + sessionId + "/"
baseURL := requestURL.String() + sessionId

uploadPipeReader, uploadPipeWriter := pipe.New(pipe.WithSizeLimit(maxUploadSize))

@@ -251,85 +178,54 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me

<-requestsLimiter.Wait()

url := uploadUrl + strconv.FormatInt(requestCounter, 10)
seq := requestCounter
requestCounter += 1

go func() {
defer requestsLimiter.Signal()
req, err := http.NewRequest("POST", url, &buf.MultiBufferContainer{MultiBuffer: chunk})

err := httpClient.SendUploadRequest(
context.WithoutCancel(ctx),
baseURL+"/"+strconv.FormatInt(seq, 10),
&buf.MultiBufferContainer{MultiBuffer: chunk},
)

if err != nil {
newError("failed to send upload").Base(err).WriteToLog()
uploadPipeReader.Interrupt()
return
}

req.Header = transportConfiguration.GetRequestHeader()

if httpClient.isH2 {
resp, err := httpClient.upload.Do(req)
if err != nil {
newError("failed to send upload").Base(err).WriteToLog()
uploadPipeReader.Interrupt()
return
}
defer resp.Body.Close()

if resp.StatusCode != 200 {
newError("failed to send upload, bad status code:", resp.Status).WriteToLog()
uploadPipeReader.Interrupt()
return
}
} else {
var err error
var uploadConn any
for i := 0; i < 5; i++ {
uploadConn = httpClient.uploadRawPool.Get()
if uploadConn == nil {
uploadConn, err = httpClient.dialUploadConn(context.WithoutCancel(ctx))
if err != nil {
newError("failed to connect upload").Base(err).WriteToLog()
uploadPipeReader.Interrupt()
return
}
}

err = req.Write(uploadConn.(net.Conn))
if err == nil {
break
}
}

if err != nil {
newError("failed to send upload").Base(err).WriteToLog()
uploadPipeReader.Interrupt()
return
}

httpClient.uploadRawPool.Put(uploadConn)
}
}()

}
}()

// we want to block Dial until we know the remote address of the server,
// for logging purposes
<-gotConn.Wait()

// necessary in order to send larger chunks in upload
bufferedUploadPipeWriter := buf.NewBufferedWriter(uploadPipeWriter)
bufferedUploadPipeWriter.SetBuffered(false)
lazyRawDownload, remoteAddr, localAddr, err := httpClient.OpenDownload(context.WithoutCancel(ctx), baseURL)
if err != nil {
return nil, err
}

lazyDownload := &LazyReader{
CreateReader: func() (io.ReadCloser, error) {
<-gotDownResponse.Wait()
if downResponse == nil {
return nil, newError("downResponse failed")
// skip "ooooooooook" response
trashHeader := []byte{0}
for {
_, err := io.ReadFull(lazyRawDownload, trashHeader)
if err != nil {
return nil, newError("failed to read initial response").Base(err)
}
if trashHeader[0] == 'k' {
break
}
}
return downResponse, nil

return lazyRawDownload, nil
},
}

// necessary in order to send larger chunks in upload
bufferedUploadPipeWriter := buf.NewBufferedWriter(uploadPipeWriter)
bufferedUploadPipeWriter.SetBuffered(false)

conn := splitConn{
writer: bufferedUploadPipeWriter,
reader: lazyDownload,
2 changes: 1 addition & 1 deletion transport/internet/splithttp/hub.go
Original file line number Diff line number Diff line change
@@ -163,7 +163,7 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
writer.Header().Set("X-Accel-Buffering", "no")
// magic header to make the HTTP middle box consider this as SSE to disable buffer
writer.Header().Set("Content-Type", "text/event-stream")

writer.WriteHeader(http.StatusOK)
// send a chunk immediately to enable CDN streaming.
// many CDN buffer the response headers until the origin starts sending
40 changes: 19 additions & 21 deletions transport/internet/websocket/connection.go
Original file line number Diff line number Diff line change
@@ -11,25 +11,23 @@ import (
"github.com/xtls/xray-core/common/serial"
)

var _ buf.Writer = (*connection)(nil)
var _ buf.Writer = (*Connection)(nil)

// connection is a wrapper for net.Conn over WebSocket connection.
type connection struct {
conn *websocket.Conn
reader io.Reader
remoteAddr net.Addr
type Connection struct {
conn *websocket.Conn
reader io.Reader
}

func newConnection(conn *websocket.Conn, remoteAddr net.Addr, extraReader io.Reader) *connection {
return &connection{
conn: conn,
remoteAddr: remoteAddr,
reader: extraReader,
func NewConnection(conn *websocket.Conn, remoteAddr net.Addr, extraReader io.Reader) *Connection {
return &Connection{
conn: conn,
reader: extraReader,
}
}

// Read implements net.Conn.Read()
func (c *connection) Read(b []byte) (int, error) {
func (c *Connection) Read(b []byte) (int, error) {
for {
reader, err := c.getReader()
if err != nil {
@@ -45,7 +43,7 @@ func (c *connection) Read(b []byte) (int, error) {
}
}

func (c *connection) getReader() (io.Reader, error) {
func (c *Connection) getReader() (io.Reader, error) {
if c.reader != nil {
return c.reader, nil
}
@@ -59,21 +57,21 @@ func (c *connection) getReader() (io.Reader, error) {
}

// Write implements io.Writer.
func (c *connection) Write(b []byte) (int, error) {
func (c *Connection) Write(b []byte) (int, error) {
if err := c.conn.WriteMessage(websocket.BinaryMessage, b); err != nil {
return 0, err
}
return len(b), nil
}

func (c *connection) WriteMultiBuffer(mb buf.MultiBuffer) error {
func (c *Connection) WriteMultiBuffer(mb buf.MultiBuffer) error {
mb = buf.Compact(mb)
mb, err := buf.WriteMultiBuffer(c, mb)
buf.ReleaseMulti(mb)
return err
}

func (c *connection) Close() error {
func (c *Connection) Close() error {
var errors []interface{}
if err := c.conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add(time.Second*5)); err != nil {
errors = append(errors, err)
@@ -87,25 +85,25 @@ func (c *connection) Close() error {
return nil
}

func (c *connection) LocalAddr() net.Addr {
func (c *Connection) LocalAddr() net.Addr {
return c.conn.LocalAddr()
}

func (c *connection) RemoteAddr() net.Addr {
return c.remoteAddr
func (c *Connection) RemoteAddr() net.Addr {
return c.conn.RemoteAddr()
}

func (c *connection) SetDeadline(t time.Time) error {
func (c *Connection) SetDeadline(t time.Time) error {
if err := c.SetReadDeadline(t); err != nil {
return err
}
return c.SetWriteDeadline(t)
}

func (c *connection) SetReadDeadline(t time.Time) error {
func (c *Connection) SetReadDeadline(t time.Time) error {
return c.conn.SetReadDeadline(t)
}

func (c *connection) SetWriteDeadline(t time.Time) error {
func (c *Connection) SetWriteDeadline(t time.Time) error {
return c.conn.SetWriteDeadline(t)
}
60 changes: 7 additions & 53 deletions transport/internet/websocket/dialer.go
Original file line number Diff line number Diff line change
@@ -1,54 +1,23 @@
package websocket

import (
"bytes"
"context"
_ "embed"
"encoding/base64"
"io"
gonet "net"
"net/http"
"time"

"github.com/gorilla/websocket"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/platform"
"github.com/xtls/xray-core/common/session"
"github.com/xtls/xray-core/common/uuid"
"github.com/xtls/xray-core/transport/internet"
"github.com/xtls/xray-core/transport/internet/browser_dialer"
"github.com/xtls/xray-core/transport/internet/stat"
"github.com/xtls/xray-core/transport/internet/tls"
)

//go:embed dialer.html
var webpage []byte

var conns chan *websocket.Conn

func init() {
addr := platform.NewEnvFlag(platform.BrowserDialerAddress).GetValue(func() string { return "" })
if addr != "" {
token := uuid.New()
csrfToken := token.String()
webpage = bytes.ReplaceAll(webpage, []byte("csrfToken"), []byte(csrfToken))
conns = make(chan *websocket.Conn, 256)
go http.ListenAndServe(addr, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/websocket" {
if r.URL.Query().Get("token") == csrfToken {
if conn, err := upgrader.Upgrade(w, r, nil); err == nil {
conns <- conn
} else {
newError("Browser dialer http upgrade unexpected error").AtError().WriteToLog()
}
}
} else {
w.Write(webpage)
}
}))
}
}

// Dial dials a WebSocket connection to the given destination.
func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (stat.Connection, error) {
newError("creating connection to ", dest).WriteToLog(session.ExportIDToError(ctx))
@@ -124,28 +93,13 @@ func dialWebSocket(ctx context.Context, dest net.Destination, streamSettings *in
}
uri := protocol + "://" + host + wsSettings.GetNormalizedPath()

if conns != nil {
data := []byte(uri)
if ed != nil {
data = append(data, " "+base64.RawURLEncoding.EncodeToString(ed)...)
}
var conn *websocket.Conn
for {
conn = <-conns
if conn.WriteMessage(websocket.TextMessage, data) != nil {
conn.Close()
} else {
break
}
}
if _, p, err := conn.ReadMessage(); err != nil {
conn.Close()
if browser_dialer.HasBrowserDialer() {
conn, err := browser_dialer.DialWS(uri, ed)
if err != nil {
return nil, err
} else if s := string(p); s != "ok" {
conn.Close()
return nil, newError(s)
}
return newConnection(conn, conn.RemoteAddr(), nil), nil

return NewConnection(conn, conn.RemoteAddr(), nil), nil
}

header := wsSettings.GetRequestHeader()
@@ -163,7 +117,7 @@ func dialWebSocket(ctx context.Context, dest net.Destination, streamSettings *in
return nil, newError("failed to dial to (", uri, "): ", reason).Base(err)
}

return newConnection(conn, conn.RemoteAddr(), nil), nil
return NewConnection(conn, conn.RemoteAddr(), nil), nil
}

type delayDialConn struct {
59 changes: 0 additions & 59 deletions transport/internet/websocket/dialer.html

This file was deleted.

2 changes: 1 addition & 1 deletion transport/internet/websocket/hub.go
Original file line number Diff line number Diff line change
@@ -73,7 +73,7 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
}
}

h.ln.addConn(newConnection(conn, remoteAddr, extraReader))
h.ln.addConn(NewConnection(conn, remoteAddr, extraReader))
}

type Listener struct {

0 comments on commit 2347da1

Please sign in to comment.